// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Output ports for publish-subscribe notifications between actors
//!
//! This notion extends beyond traditional actors in this that is a publish-subscribe
//! mechanism we've added in `ractor`. Output ports are ports which can have messages published
//! to them which are automatically forwarded to downstream actors waiting for inputs. They optionally
//! have a message transformer attached to them to convert them to the appropriate message type
//!
//! There are two different implementation. If the feature `output-port-v2` is not specified
//! the implementation use a broadcast channel that is limited to 10 messages successively sent
//! for each susbscribed actor. That means that if 10 messages are sent to the output port successively
//! there is a high probably that any subscriber receive all messages.
//!
//! The new implementation that is accessible using `output-port-v2` use a fan-out task that distributes
//! message to all subscriber ensuring that all messages are received by all subscribers

use crate::ActorRef;
use crate::Message;

#[cfg(test)]
mod tests;

/// Output messages, since they need to be replicated, require [Clone] in addition
/// to the base [Message] constraints
pub trait OutputMessage: Message + Clone {}
impl<T: Message + Clone> OutputMessage for T {}

#[cfg(not(feature = "output-port-v2"))]
pub use v1::OutputPort;

#[cfg(feature = "output-port-v2")]
pub use v2::OutputPort;

#[cfg(not(feature = "output-port-v2"))]
mod v1 {
    use std::fmt::Debug;
    use std::sync::RwLock;

    use tokio::sync::broadcast as pubsub;

    use crate::concurrency::JoinHandle;
    use crate::{ActorRef, Message, OutputMessage};

    /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
    /// It allows actors to emit messages without knowing which downstream actors are subscribed.
    ///
    /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
    /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
    /// be dropped and if the output port is dropped, then the subscription will also be dropped
    /// automatically.
    pub struct OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        tx: pubsub::Sender<Option<TMsg>>,
        subscriptions: RwLock<Vec<OutputPortSubscription>>,
    }

    impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
        }
    }

    impl<TMsg> Default for OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        fn default() -> Self {
            // We only need enough buffer for the subscription task to forward to the input port
            // of the receiving actor. Hence 10 should be plenty.
            let (tx, _rx) = pubsub::channel(10);
            Self {
                tx,
                subscriptions: RwLock::new(vec![]),
            }
        }
    }

    impl<TMsg> OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        /// Subscribe to the output port, passing in a converter to convert to the input message
        /// of another actor
        ///
        /// * `receiver` - The reference to the actor which will receive forwarded messages
        /// * `converter` - The converter which will convert the output message type to the
        ///   receiver's input type and return [Some(_)] if the message should be forwarded, [None]
        ///   if the message should be skipped.
        pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
        where
            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
            TReceiverMsg: Message,
        {
            let mut subs = self.subscriptions.write().unwrap();

            // filter out dead subscriptions, since they're no longer valid
            subs.retain(|sub| !sub.is_dead());

            let sub = OutputPortSubscription::new::<TMsg, F, TReceiverMsg>(
                self.tx.subscribe(),
                converter,
                receiver,
            );
            subs.push(sub);
        }

        /// Send a message on the output port
        ///
        /// * `msg`: The message to send
        pub fn send(&self, msg: TMsg) {
            if self.tx.receiver_count() > 0 {
                let _ = self.tx.send(Some(msg));
            }
        }
    }

    // ============== Subscription implementation ============== //

    /// The output port's subscription handle. It holds a handle to a [JoinHandle]
    /// which listens to the [pubsub::Receiver] to see if there's a new message, and if there is
    /// forwards it to the [ActorRef] asynchronously using the specified converter.
    struct OutputPortSubscription {
        handle: JoinHandle<()>,
    }

    impl OutputPortSubscription {
        /// Determine if the subscription is dead
        pub(crate) fn is_dead(&self) -> bool {
            self.handle.is_finished()
        }

        /// Create a new subscription
        pub(crate) fn new<TMsg, F, TReceiverMsg>(
            mut port: pubsub::Receiver<Option<TMsg>>,
            converter: F,
            receiver: ActorRef<TReceiverMsg>,
        ) -> Self
        where
            TMsg: OutputMessage,
            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
            TReceiverMsg: Message,
        {
            let handle = crate::concurrency::spawn(async move {
                while let Ok(Some(msg)) = port.recv().await {
                    if let Some(new_msg) = converter(msg) {
                        if receiver.cast(new_msg).is_err() {
                            // kill the subscription process, as the forwarding agent is stopped
                            return;
                        }
                    }
                }
            });

            Self { handle }
        }
    }
}

#[cfg(feature = "output-port-v2")]
mod v2 {
    use crate::{ActorId, ActorRef, Message, OutputMessage};
    use std::fmt::Debug;

    /// An [OutputPort] is a publish-subscribe mechanism for connecting actors together.
    /// It allows actors to emit messages without knowing which downstream actors are subscribed.
    ///
    /// You can subscribe to the output port with an [ActorRef] and a message converter from the output
    /// type to the actor's expected input type. If the actor is dropped or stops, the subscription will
    /// be dropped and if the output port is dropped, then the subscription will also be dropped
    /// automatically.
    pub struct OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        inner: inner::OutputPort<ActorId, TMsg>,
    }

    impl<TMsg: OutputMessage> Debug for OutputPort<TMsg> {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(f, "OutputPort({})", std::any::type_name::<TMsg>())
        }
    }

    impl<TMsg> Default for OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        fn default() -> Self {
            Self {
                inner: inner::OutputPort::default(),
            }
        }
    }

    impl<TMsg> OutputPort<TMsg>
    where
        TMsg: OutputMessage,
    {
        /// Subscribe to the output port, passing in a converter to convert to the input message
        /// of another actor
        ///
        /// * `receiver` - The reference to the actor which will receive forwarded messages
        /// * `converter` - The converter which will convert the output message type to the
        ///   receiver's input type and return [Some(_)] if the message should be forwarded, [None]
        ///   if the message should be skipped.
        pub fn subscribe<TReceiverMsg, F>(&self, receiver: ActorRef<TReceiverMsg>, converter: F)
        where
            F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
            TReceiverMsg: Message,
        {
            self.inner.subscribe(receiver, converter)
        }

        /// Send a message on the output port
        ///
        /// * `msg`: The message to send
        pub fn send(&self, msg: TMsg) {
            self.inner.send(msg)
        }
    }

    mod inner {

        use super::OutputMessage;
        use crate::concurrency::{mpsc_unbounded, MpscUnboundedSender};
        //use crate::concurrency::{mpsc_unbounded, oneshot, MpscUnboundedSender, OneshotSender};
        use crate::{ActorId, ActorRef, DerivedActorRef, Message};

        #[cfg(feature = "tokio_runtime")]
        /// As we do a lot of iteratio without calling async
        /// method while dispatching message, we consume 1 tokio
        /// task budget unit every CONSUMBE_BUDGET_FACTOR message sent
        const CONSUME_BUDGET_FACTOR: u32 = 32;
        /// Each subscriber may receive a batch of MAX_BATCH_SIZE
        /// before another batch is sent to an other subscriber
        const MAX_BATCH_SIZE: usize = 32;

        enum OutportMessage<Id, TMsg> {
            Data(TMsg),
            SetSubscriber(Option<Box<dyn Subscriber<Id, TMsg>>>),
            //RemoveSubscriber(Id),
            //Subscribers(OneshotSender<Vec<Id>>),
        }

        pub(super) trait Subscriber<Id, TMsg: OutputMessage>: Send + 'static {
            // return false if the subscriber should be
            // removed
            fn send(&self, value: &TMsg) -> bool;
            fn id(&self) -> Id;
        }

        #[derive(Debug, Clone)]
        pub(super) struct OutputPort<Id, TMsg>(MpscUnboundedSender<OutportMessage<Id, TMsg>>);

        impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> Default
            for OutputPort<Id, TMsg>
        {
            fn default() -> Self {
                Self::new(true)
            }
        }

        impl<Id: Send + 'static + PartialEq + Clone + Sync, TMsg: OutputMessage> OutputPort<Id, TMsg> {
            pub(super) fn new(allow_duplicate_subscription: bool) -> Self {
                let (tx, mut rx) = mpsc_unbounded::<OutportMessage<Id, TMsg>>();

                crate::concurrency::spawn(async move {
                    let mut subscribers = Vec::<(Id, Box<dyn Subscriber<Id, TMsg>>)>::new();
                    let mut batch = Vec::new();
                    //NB: This algorithm may look overcomplicated but it enhances the OuputPort benchmark
                    // by 70%!!! compared to a trivial algorith... the duration of sending message to the
                    // output port is divided by 3.
                    loop {
                        let l = rx.len().clamp(1, MAX_BATCH_SIZE);
                        if rx.recv_many(&mut batch, l).await == 0 {
                            break;
                        }

                        let mut i = 0;
                        let mut coop_count = 0u32;
                        // First we iterate on subscribers already present
                        // and send to ech all messages in the batch
                        'subs: while i < subscribers.len() {
                            // We processes the messages but only
                            // apply change to subscribers that do affact
                            // the current subscriber. The aim is to preserve
                            // the sequentiality a process that would send to
                            // the output port my expect.
                            for msg in batch.iter_mut() {
                                match msg {
                                    OutportMessage::Data(v) => {
                                        if !subscribers[i].1.send(v) {
                                            subscribers.remove(i);
                                            continue 'subs;
                                        } else {
                                            coop_count = coop_count.wrapping_add(1);
                                            #[cfg(feature = "tokio_runtime")]
                                            if coop_count % CONSUME_BUDGET_FACTOR == 0 {
                                                tokio::task::coop::consume_budget().await
                                            }
                                        }
                                    }
                                    OutportMessage::SetSubscriber(opt_subscriber) => {
                                        let sid = if let Some(subscriber) = opt_subscriber {
                                            let sid = subscriber.id();
                                            if sid != subscribers[i].0 {
                                                continue;
                                            }
                                            sid
                                        } else {
                                            continue;
                                        };
                                        let subscriber = opt_subscriber.take().unwrap();

                                        // We ensure there is no duplicate subscription
                                        if !allow_duplicate_subscription {
                                            if let Some((_, prev_subscriber)) =
                                                subscribers.iter_mut().find(|(id, _)| id == &sid)
                                            {
                                                // In case of duplication, previous subscription is overrided
                                                *prev_subscriber = subscriber;
                                            } else {
                                                subscribers.push((subscriber.id(), subscriber));
                                            }
                                        } else {
                                            subscribers.push((subscriber.id(), subscriber));
                                        }
                                    }
                                }
                            }
                            i += 1;
                        }

                        let i0 = i;
                        // The for the new subscribers we switch back
                        // to a less efficient algrorithm we iterate first by messages
                        // then by subscribers to ensure expected sequentiality.
                        for msg in batch.drain(..) {
                            match msg {
                                OutportMessage::Data(v) => {
                                    // We do not want to hold a reference to dyn Subscriber
                                    // to cross an await, otherwise, Subscriber would need to be Sync.
                                    // So we iterate by index. This also simplify extraction
                                    // of subscribers.
                                    let mut i = i0;
                                    while i < subscribers.len() {
                                        if !subscribers[i].1.send(&v) {
                                            subscribers.remove(i);
                                        } else {
                                            i += 1;
                                            #[cfg(feature = "tokio_runtime")]
                                            if coop_count % CONSUME_BUDGET_FACTOR == 0 {
                                                tokio::task::coop::consume_budget().await
                                            }
                                            coop_count = coop_count.wrapping_add(1);
                                        }
                                    }
                                }
                                OutportMessage::SetSubscriber(Some(subscriber)) => {
                                    let sid = subscriber.id();

                                    // We ensure there is no duplicate subscription
                                    if !allow_duplicate_subscription {
                                        if let Some((_, prev_subscriber)) =
                                            subscribers.iter_mut().find(|(id, _)| id == &sid)
                                        {
                                            // In case of duplication, previous subscription is overrided
                                            *prev_subscriber = subscriber;
                                        } else {
                                            subscribers.push((subscriber.id(), subscriber));
                                        }
                                    } else {
                                        subscribers.push((subscriber.id(), subscriber));
                                    }
                                }
                                OutportMessage::SetSubscriber(None) => (),
                            }
                        }
                    }
                });

                Self(tx)
            }

            pub(super) fn send(&self, value: TMsg) {
                _ = self.0.send(OutportMessage::Data(value));
            }
        }

        impl<TMsg: OutputMessage> OutputPort<ActorId, TMsg> {
            pub(super) fn subscribe<TReceiverMsg, F>(
                &self,
                receiver: ActorRef<TReceiverMsg>,
                converter: F,
            ) where
                F: Fn(TMsg) -> Option<TReceiverMsg> + Send + 'static,
                TReceiverMsg: Message,
            {
                self.set_subscriber_with_filter(receiver, move |msg| converter(msg.clone()))
            }

            pub(super) fn set_subscriber_with_filter<R: ActorReference>(
                &self,
                actor_ref: R,
                filter: impl Fn(&TMsg) -> Option<R::Msg> + Send + 'static,
            ) {
                _ = self
                    .0
                    .send(OutportMessage::SetSubscriber(Some(Box::new(Filtering {
                        actor_ref,
                        filter,
                    }))));
            }
        }

        impl<T: OutputMessage, U: Message> Subscriber<ActorId, T> for ActorRef<U>
        where
            U: TryFrom<T>,
        {
            fn send(&self, value: &T) -> bool {
                if let Ok(value) = value.clone().try_into() {
                    self.send_message(value).is_ok()
                } else {
                    true
                }
            }

            fn id(&self) -> ActorId {
                self.get_id()
            }
        }
        impl<T: OutputMessage> Subscriber<ActorId, T> for DerivedActorRef<T> {
            fn send(&self, value: &T) -> bool {
                self.send_message(value.clone()).is_ok()
            }

            fn id(&self) -> ActorId {
                self.get_id()
            }
        }
        struct Filtering<T, F> {
            pub actor_ref: T,
            pub filter: F,
        }
        impl<T: ActorReference, U: OutputMessage, F: Fn(&U) -> Option<T::Msg> + Send + 'static>
            Subscriber<ActorId, U> for Filtering<T, F>
        {
            fn send(&self, value: &U) -> bool {
                if let Some(v) = (self.filter)(value) {
                    self.actor_ref.send_message(v)
                } else {
                    true
                }
            }

            fn id(&self) -> ActorId {
                self.actor_ref.id()
            }
        }
        pub(super) trait ActorReference: Send + Sync + 'static {
            type Msg: Message;
            fn send_message(&self, value: Self::Msg) -> bool;
            fn id(&self) -> ActorId;
        }
        impl<T: Message> ActorReference for ActorRef<T> {
            type Msg = T;

            fn send_message(&self, value: T) -> bool {
                self.send_message(value).is_ok()
            }

            fn id(&self) -> ActorId {
                self.get_id()
            }
        }
        impl<T: Message> ActorReference for DerivedActorRef<T> {
            type Msg = T;

            fn send_message(&self, value: T) -> bool {
                self.send_message(value).is_ok()
            }

            fn id(&self) -> ActorId {
                self.get_id()
            }
        }
    }
}

/// Represents a boxed `ActorRef` subscriber capable of handling messages from a
/// publisher via an `OutputPort`, employing a publish-subscribe pattern to
/// decouple message broadcasting from handling. For a subscriber `ActorRef` to
/// function as an `OutputPortSubscriber<T>`, its message type must implement
/// `From<T>` to convert the published message type to its own message format.
///
/// # Example
/// ```
/// // First, define the publisher's message types, including a variant for
/// // subscribing `OutputPortSubscriber`s and another for publishing messages:
/// use ractor::{
///     cast,
///     port::{OutputPort, OutputPortSubscriber},
///     Actor, ActorProcessingErr, ActorRef, Message,
/// };
///
/// enum PublisherMessage {
///     Publish(u8),                         // Message type for publishing
///     Subscribe(OutputPortSubscriber<u8>), // Message type for subscribing an actor to the output port
/// }
///
/// #[cfg(feature = "cluster")]
/// impl Message for PublisherMessage {
///     fn serializable() -> bool {
///         false
///     }
/// }
///
/// // In the publisher actor's `handle` function, handle subscription requests and
/// // publish messages accordingly:
///
/// struct Publisher;
/// struct State {
///     output_port: OutputPort<u8>,
/// }
///
/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
/// impl Actor for Publisher {
///     type State = State;
///     type Msg = PublisherMessage;
///     type Arguments = ();
///
///     async fn pre_start(
///         &self,
///         _myself: ActorRef<Self::Msg>,
///         _: (),
///     ) -> Result<Self::State, ActorProcessingErr> {
///         Ok(State {
///             output_port: OutputPort::default(),
///         })
///     }
///
///     async fn handle(
///         &self,
///         _myself: ActorRef<Self::Msg>,
///         message: Self::Msg,
///         state: &mut Self::State,
///     ) -> Result<(), ActorProcessingErr> {
///         match message {
///             PublisherMessage::Subscribe(subscriber) => {
///                 // Subscribes the `OutputPortSubscriber` wrapped actor to the `OutputPort`
///                 subscriber.subscribe_to_port(&state.output_port);
///             }
///             PublisherMessage::Publish(value) => {
///                 // Broadcasts the `u8` value to all subscribed actors, which will handle the type conversion
///                 state.output_port.send(value);
///             }
///         }
///         Ok(())
///     }
/// }
///
/// // The subscriber's message type demonstrates how to transform the publisher's
/// // message type by implementing `From<T>`:
///
/// #[derive(Debug)]
/// enum SubscriberMessage {
///     Handle(String), // Subscriber's intent for message handling
/// }
///
/// #[cfg(feature = "cluster")]
/// impl Message for SubscriberMessage {
///     fn serializable() -> bool {
///         false
///     }
/// }
///
/// impl From<u8> for SubscriberMessage {
///     fn from(value: u8) -> Self {
///         SubscriberMessage::Handle(value.to_string()) // Converts u8 to String
///     }
/// }
///
/// // To subscribe a subscriber actor to the publisher and broadcast a message:
/// struct Subscriber;
/// #[cfg_attr(feature = "async-trait", ractor::async_trait)]
/// impl Actor for Subscriber {
///     type State = ();
///     type Msg = SubscriberMessage;
///     type Arguments = ();
///
///     async fn pre_start(
///         &self,
///         _myself: ActorRef<Self::Msg>,
///         _: (),
///     ) -> Result<Self::State, ActorProcessingErr> {
///         Ok(())
///     }
///
///     async fn handle(
///         &self,
///         _myself: ActorRef<Self::Msg>,
///         message: Self::Msg,
///         _state: &mut Self::State,
///     ) -> Result<(), ActorProcessingErr> {
///         Ok(())
///     }
/// }
/// async fn example() {
///     let (publisher_actor_ref, publisher_actor_handle) =
///         Actor::spawn(None, Publisher, ()).await.unwrap();
///     let (subscriber_actor_ref, subscriber_actor_handle) =
///         Actor::spawn(None, Subscriber, ()).await.unwrap();
///
///     publisher_actor_ref
///         .send_message(PublisherMessage::Subscribe(Box::new(subscriber_actor_ref)))
///         .unwrap();
///
///     // Broadcasting a message to all subscribers
///     publisher_actor_ref
///         .send_message(PublisherMessage::Publish(123))
///         .unwrap();
///
///     publisher_actor_handle.await.unwrap();
///     subscriber_actor_handle.await.unwrap();
/// }
/// ```
pub type OutputPortSubscriber<InputMessage> = Box<dyn OutputPortSubscriberTrait<InputMessage>>;
/// A trait for subscribing to an [OutputPort]
pub trait OutputPortSubscriberTrait<I>: Send
where
    I: Message + Clone,
{
    /// Subscribe to the output port
    fn subscribe_to_port(&self, port: &OutputPort<I>);
}

impl<I, O> OutputPortSubscriberTrait<I> for ActorRef<O>
where
    I: Message + Clone,
    O: Message + From<I>,
{
    fn subscribe_to_port(&self, port: &OutputPort<I>) {
        port.subscribe(self.clone(), |msg| Some(O::from(msg)));
    }
}
